package org.databandtech.flink.demo;

import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.databandtech.flink.keyselector.Tuple2Selector;
import org.databandtech.flink.reducefunc.AddReduce;
import org.databandtech.flink.source.TimerSource;

public class ReduceByKey {
	private static final String[] TYPE = { "苹果", "梨", "西瓜", "葡萄", "火龙果" };

	public static void main(String[] args) throws Exception {
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 添加自定义数据源,每秒发出一笔订单信息{商品名称,商品数量}
		DataStreamSource<Tuple2<String, Integer>> inStreamSource = env
				.addSource(new TimerSource<String>(TYPE,1),"randomsource");
		
		//也可以用匿名函数
//				.addSource(new SourceFunction<Tuple2<String, Integer>>() {
//
//					private static final long serialVersionUID = 1L;
//					private volatile boolean isRunning = true;
//					private final Random random = new Random();
//					
//					@Override
//					public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
//						while (isRunning) {
//							TimeUnit.SECONDS.sleep(1);
//							ctx.collect(Tuple2.of(TYPE[random.nextInt(TYPE.length)], 1));
//						}
//					}
//
//					@Override
//					public void cancel() {
//						isRunning = false;
//					}
//
//				}, "random-source");
		
		// 将上一元素与当前元素相加后，返回给下一元素处理
		SingleOutputStreamOperator<Tuple2<String, Integer>> outStream = inStreamSource
				.keyBy(new Tuple2Selector())//根据selector定义key规则
				.reduce(new AddReduce());

		outStream.print();

		env.execute("Flink Streaming Java API Skeleton");
	}
}